
import java.util.{Locale, Properties}

import org.apache.spark.{SparkConf, SparkContext}
//SparkContext是spark功能的主要入口
//可以理解为一个SparkContext就是一个spark application的生命周期
//spark context设置内部服务，并建立与spark执行环境的连接
import org.apache.spark.rdd.RDD
//RDD（Resilient Distributed Dataset）叫做弹性分布式数据集，是Spark中最基本的数据抽象，它代表一个不可变、可分区、里面的元素可并行计算的集合。
// RDD具有数据流模型的特点：自动容错、位置感知性调度和可伸缩性。
// RDD允许用户在执行多个查询时显式地将工作集缓存在内存中，后续的查询能够重用工作集，这极大地提升了查询速度
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.functions.desc
import utils.MySQLUtils

/**
 * ID, Case Number, Date, Block, IUCR, Primary Type, Description, Location Description, Arrest, Domestic, Beat, District, Ward, Community Area,
 * * FBI Code, X Coordinate, Y Coordinate, Year, Updated On, Latitude, Longitude, Location.
 */
object SparkSQLCsv {


  case class Data(ID: String, CaseNumber: String, Date: String, Block: String, IUCR: String, PrimaryType: String, Description: String, LocationDescription: String,
                  Arrest: String, Domestic: String, Beat: String, District: String, Ward: String, CommunityArea: String, FBICode: String, Year: String, UpdatedOn: String, Location: String)


  def main(args: Array[String]): Unit = {
    //Scala程序从main()方法开始处理，这是每一个Scala程序的强制程序入口部分。
    /**
     * SparkContext 的初始化需要一个SparkConf对象
     * SparkConf包含了Spark集群的配置的各种参数
     */
    //任何 Spark程序都是SparkContext开始的，SparkContext的初始化需要一个SparkConf对象，SparkConf包含了Spark集群配置的各种参数。
    //初始化后，就可以使用SparkContext对象所包含的各种方法来创建和操作RDD和共享变量
    //setAppName就是在web端显示应用名
    //master(“local”) 设置要连接的master URL

    val conf = new SparkConf().setAppName("SparkSQLCsv").setMaster("local[8]")
    conf.set("spark.executor.memory","6G").set("spark.driver.memory","4G").set("spark.executor.cores","1").set("spark.yarn.am.waitTime","1000s")
    //创建SparkSession
    //为用户提供了一个统一的切入点来使用Spark的各项功能，并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。
    // 最重要的是，它减少了用户需要了解的一些概念，使得我们可以很容易地与Spark交互。
    //Spark程序的编写都是从SparkContext开始的
    val sc = new SparkContext(conf)
    val sqlCt = new SQLContext(sc)
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val parsedData = readDataCsv(sc)


    //创建dataframes并注册这两个表
    val df = sqlCt.createDataFrame(parsedData)
    //DataFrame：指定到列的数据集，提供了内存管理和优化的执行计划
    df.registerTempTable("data")


    println("The data read from the file: ")
    println(parsedData.count())
    println


    //查询每年各种类型的犯罪是多少
    val res1 = df.groupBy("PrimaryType", "Year").count().orderBy("Year", "PrimaryType")

    MySQLUtils.saveDFtoDBCreateTableIfNotExist("res1",res1)
    MySQLUtils.deleteData("res1")
    MySQLUtils.saveDFtoDBUsePool("res1",res1)


    //查询每年犯罪的数量
    val res2_1 = df.groupBy("Year").count().orderBy("Year")
    println("res2_1....start..")
    res2_1.collect().foreach(println)
      //collect->将RDD类型的数据转化为数组，同时会从远程集群是拉取数据到driver端
      //foreach->依次遍历远程集群上的RDD中的元素
      println("res2_1....end..")
    val res2Count = res2_1.count()
    //查询每年Arrest的数量
    val res2_2 = df.where("Arrest == True").groupBy("Year").count().orderBy("Year")


    MySQLUtils.saveDFtoDBCreateTableIfNotExist("res2_1",res2_1)
    MySQLUtils.deleteData("res2_1")
    MySQLUtils.saveDFtoDBUsePool("res2_1",res2_1)

    MySQLUtils.saveDFtoDBCreateTableIfNotExist("res2_2",res2_2)
    MySQLUtils.deleteData("res2_2")
    MySQLUtils.saveDFtoDBUsePool("res2_2",res2_2)

    //计算各种犯罪的数量
    val res3 = df.groupBy("PrimaryType").count().orderBy(desc("count"))

    MySQLUtils.saveDFtoDBCreateTableIfNotExist("res3",res3)
    MySQLUtils.deleteData("res3")
    MySQLUtils.saveDFtoDBUsePool("res3",res3)

    //查询犯罪频率高的地区
    val res4 = df.groupBy("LocationDescription").count().orderBy(desc("count"))
    MySQLUtils.saveDFtoDBCreateTableIfNotExist("res4",res4)
    MySQLUtils.deleteData("res4")
    MySQLUtils.saveDFtoDBUsePool("res4",res4)
   // res4.coalesce(1).write.option("header","true").csv("D:\Dataset\Crimes_-_2001_to_20211126")

    //分组查询各地区各种犯罪的数量
    val res5 = df.groupBy("PrimaryType","LocationDescription").count().sort("PrimaryType","LocationDescription")
    MySQLUtils.saveDFtoDBCreateTableIfNotExist("res5",res5)
    MySQLUtils.deleteData("res5")
    MySQLUtils.saveDFtoDBUsePool("res5",res5)
    //res5.coalesce(1).write.option("header","true").csv("D:\Dataset\Crimes_-_2001_to_20211126")

    //分组查询每年thief的数量
    val res6_1 = df.filter(df.col("PrimaryType").equalTo("THEFT")).groupBy("Year").count().orderBy("Year")
    val res6_2 = df.filter(df.col("PrimaryType").equalTo("HOMICIDE")).groupBy("Year").count().orderBy("Year")
    MySQLUtils.saveDFtoDBCreateTableIfNotExist("res6_1",res6_1)
    MySQLUtils.deleteData("res6_1")
    MySQLUtils.saveDFtoDBUsePool("res6_1",res6_1)

    MySQLUtils.saveDFtoDBCreateTableIfNotExist("res6_2",res6_2)
    MySQLUtils.deleteData("res6_2")
    MySQLUtils.saveDFtoDBUsePool("res6_2",res6_2)

    //计算Criminal Sexual Assault and Sex Offense数量
    val res6_3 = df.filter(df.col("PrimaryType").equalTo("CRIM SEXUAL ASSAULT")).union(df.filter(df.col("PrimaryType").equalTo("SEX OFFENSE")))
        .groupBy("PrimaryType","Year").count().orderBy("PrimaryType","Year")



    MySQLUtils.saveDFtoDBCreateTableIfNotExist("res6_3",res6_3)
    MySQLUtils.deleteData("res6_3")
    MySQLUtils.saveDFtoDBUsePool("res6_3",res6_3)

   // res6_3.coalesce(1).write.option("header","true").csv("D:\Dataset\Crimes_-_2001_to_20211126")

//    res1.foreachPartition(x=>{
//      x.foreach(x=>{
//        println(x)
//      })
//    })

//    res1.foreach(x=>{
//      println(x)
//    })




//    res1.show()
//      println("rs1  count:" + res1.count())
//    res1.collect().foreach(println)

    sc.stop()
  }

  private def readDataCsv(sc: SparkContext): RDD[Data] = {
    val data = sc.textFile("D:\\Dataset\\Crimes_-_2001_to_20211126.csv")
    //rdd创建方式之一：由外部存储系统的数据集创建，包括本地的文件系统（？）

    val header = data.first();
    //first（）返回RDD的第一个元素

    val cleanData = data.filter(line => !line.equals(header)).filter(line=>{
      val fields = line.split(",")
      fields.size ==23
    })filter(line => {
      val fields = line.split(",")
      //split->把一个字符串分割成字符串数组

      val ID = fields(0)
      val CaseNumber = fields(1)
      val Date = fields(2)
      val Block = fields(3)
      val IUCR = fields(4)
      val PrimaryType = fields(5)
      val Description = fields(6)
      val LocationDescription = fields(7)
      val Arrest = fields(8)
      val Domestic = fields(9)
      val Beat = fields(10)
      val District = fields(11)
      val Ward = fields(12)
      val CommunityArea = fields(13)
      val FBICode = fields(14)
      val XCoordinate = fields(15)
      val YCoordinate = fields(16)
      val Year = fields(17)
      val UpdatedOn = fields(18)
      val Latitude = fields(19)
      val Longitude = fields(20)
      val Location = fields(21)+fields(22)
      ID != "" && CaseNumber != "" && Date != "" && Block != "" && IUCR != "" && PrimaryType != "" && Description != "" && LocationDescription != "" &&
        Arrest != "" && Domestic != "" && Beat != "" && District != "" && Ward != "" && CommunityArea != "" && FBICode != "" && XCoordinate != "" &&
        YCoordinate != "" && Year != "" && UpdatedOn != "" && Latitude != "" && Longitude != "" && Location != ""

    })



    val parseData = cleanData.distinct().map(line => {
      //distinct()->对源RDD进行去重后返回一个新的RDD
      //map->返回一个新的RDD，该RDD由每一个输入元素经过filter转换后组成

      val fields = line.split(",")

      val ID = fields(0)
      val CaseNumber = fields(1)
      val Date = fields(2)
      val Block = fields(3)
      val IUCR = fields(4)
      val PrimaryType = fields(5)
      val Description = fields(6)
      val LocationDescription = fields(7)
      val Arrest = fields(8)
      val Domestic = fields(9)
      val Beat = fields(10)
      val District = fields(11)
      val Ward = fields(12)
      val CommunityArea = fields(13)
      val FBICode = fields(14)

      val Year = fields(17)
      val UpdatedOn = fields(18)
      val Location = fields(21)+fields(22)

      Data(ID, CaseNumber, Date, Block, IUCR, PrimaryType, Description, LocationDescription, Arrest, Domestic, Beat, District, Ward, CommunityArea, FBICode, Year, UpdatedOn, Location)
    })

    parseData

  }

  private def writeDataToMysql(): Unit ={
    val props = new Properties()
    //使用setProperty设置全局变量
    props.setProperty("user","root")
    props.setProperty("password","niit")
    props.setProperty("driver","com.mysql.cj.jdbc.Driver")
    props.setProperty("numPartitions","10")

  }
}
